package cn.doitedu.rtdw.data_sync;

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/8
 * @Desc: 学大数据，到多易教育
 *
 **/
public class S04_SyncJob_AdRequestLog {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建逻辑表，读取kafka中的 广告请求特征日志数据
        tEnv.executeSql(
                " CREATE TABLE ad_request_source (            "
                +"     requestTime bigint,                             "
                +"     platform STRING,                                "
                +"     pageId STRING,                                  "
                +"     locationType STRING,                            "
                +"     mediaType STRING,                               "
                +"     returnAdId STRING,                              "
                +"     creativeId STRING,                              "
                +"     modelType STRING,                               "
                +"     client MAP<STRING,STRING>,                      "
                +"     userFeatures MAP<STRING,DOUBLE>,                "
                +"     adFeatures MAP<STRING,DOUBLE>,                  "
                +"     ad_tracking_id STRING                           "
                +" ) WITH (                                            "
                +"   'connector' = 'kafka',                            "
                +"   'topic' = 'ad_request_log',                       "
                +"   'properties.bootstrap.servers' = 'doitedu:9092',  "
                +"   'properties.group.id' = 'testGroup',              "
                +"   'scan.startup.mode' = 'latest-offset',            "
                +"   'value.format'='json',                            "
                +"   'value.json.fail-on-missing-field'='false',       "
                +"   'value.fields-include' = 'EXCEPT_KEY')            ");

        // 创建一个逻辑表， 映射 hbase
        tEnv.executeSql("CREATE TABLE ad_request_hbasesink( " +
                " ad_tracking_id STRING, " +
                " f ROW<requestTime bigint,platform STRING, pageId STRING, " +
                "   locationType STRING, mediaType STRING, returnAdId STRING, creativeId STRING, modelType STRING, " +
                "   client STRING, userFeatures STRING,adFeatures STRING>, " +
                " PRIMARY KEY (ad_tracking_id) NOT ENFORCED " +
                ") WITH (                             " +
                " 'connector' = 'hbase-2.2',          " +
                " 'table-name' = 'ad_request_log',     " +
                " 'zookeeper.quorum' = 'doitedu:2181' " +
                ")");


        // 执行一个insert语句来插入数据到hbase表
        tEnv.createTemporaryFunction("map_json",Map2JsonString.class);
        tEnv.createTemporaryFunction("map_json2",Map2JsonString2.class);
        tEnv.executeSql(
                        " CREATE TEMPORARY VIEW tmp AS (      "
                        +" SELECT                                       "
                        +"     ad_tracking_id                	        "
                        +"     ,requestTime                             "
                        +"     ,platform                                "
                        +"     ,pageId                                  "
                        +"     ,locationType                            "
                        +"     ,mediaType                               "
                        +"     ,returnAdId                              "
                        +"     ,creativeId                              "
                        +"     ,modelType                               "
                        +"     ,map_json(client) as client              "
                        +"     ,map_json2(userFeatures) as  userFeatures "
                        +"     ,map_json2(adFeatures)   as  adFeatures   "
                        +" FROM ad_request_source                       "
                        +" )                                            ");

        tEnv.executeSql(
                    " INSERT INTO ad_request_hbasesink         "
                        +" SELECT                                       "
                        +"     ad_tracking_id                           "
                        +"     ,ROW(	                                "
                        +"        requestTime                           "
                        +"        ,platform                             "
                        +"        ,pageId                               "
                        +"        ,locationType                         "
                        +"        ,mediaType                            "
                        +"        ,returnAdId                           "
                        +"        ,creativeId                           "
                        +"        ,modelType                            "
                        +"        ,client                               "
                        +"        ,userFeatures                         "
                        +"        ,adFeatures                           "
                        +"     ) as f                                   "
                        +" FROM tmp                                     "
        );



    }


    public static class Map2JsonString extends ScalarFunction{
        public String eval(Map<String,String> map){
            return JSON.toJSONString(map);
        }
    }

    public static class Map2JsonString2 extends ScalarFunction{
        public String eval(Map<String,Double> map){
            return JSON.toJSONString(map);
        }
    }

}
